
package com.shiku.imserver.service;


import com.alibaba.fastjson.JSONObject;
import com.shiku.imserver.CoreService;
import com.shiku.imserver.common.AbstractService;
import com.shiku.imserver.common.ImConfig;
import com.shiku.imserver.common.constant.KConstants;
import com.shiku.imserver.common.message.ChatMessage;
import com.shiku.utils.StringUtil;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.ServiceState;
import org.apache.rocketmq.common.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class RocketmqService
        extends AbstractService {
    public static Logger logger = LoggerFactory.getLogger(RocketmqService.class);

    private ImConfig.MQConfig mqConfig;

    private DefaultMQProducer userStatusProducer;
    private DefaultMQProducer pushProducer;


    @Override
    public boolean initialize() {

        this.mqConfig = IMBeanUtils.getImconfig().getMqConfig();

        getUserStatusProducer();

        getPushProducer();

        return false;

    }


    public DefaultMQProducer getUserStatusProducer() {

        if (null != this.userStatusProducer) {
            return this.userStatusProducer;
        }

        synchronized (this.mqConfig) {


            try {

                System.out.println("userStatusProducer init  " + this.mqConfig.getNameAddr());


                this.userStatusProducer = new DefaultMQProducer("userStatusProducer");

                this.userStatusProducer.setNamesrvAddr(this.mqConfig.getNameAddr());

                this.userStatusProducer.setVipChannelEnabled(false);

                this.userStatusProducer.setCreateTopicKey("userStatusMessage");

                this.userStatusProducer.start();

                Thread.sleep(500L);

            } catch (Exception e) {

                System.err.println(e.getMessage());

            }

        }

        return this.userStatusProducer;

    }


    public DefaultMQProducer getPushProducer() {

        if (null != this.pushProducer) {

            return this.pushProducer;

        }

        synchronized (this.mqConfig) {

            try {

                this.pushProducer = new DefaultMQProducer("pushProducer");

                this.pushProducer.setNamesrvAddr(this.mqConfig.getNameAddr());

                this.pushProducer.setVipChannelEnabled(false);


                this.pushProducer.start();

                Thread.sleep(500L);

            } catch (Exception e) {

                e.printStackTrace();

            }

        }

        return this.pushProducer;

    }


    public void restartProducer(DefaultMQProducer producer) {

        String producerGroup = producer.getProducerGroup();


        String namesrvAddr = producer.getNamesrvAddr();


        System.out.println(" restartProducer ===》 " + producerGroup);

        try {

            if (null != producer && null != producer.getDefaultMQProducerImpl()) {

                if (ServiceState.CREATE_JUST == producer.getDefaultMQProducerImpl().getServiceState()) {

                    producer.start();

                }

            } else {

                synchronized (producer) {

                    producer = new DefaultMQProducer(producerGroup);

                    producer.setNamesrvAddr(namesrvAddr);

                    producer.setVipChannelEnabled(false);

                    producer.start();

                    Thread.sleep(500L);

                }

            }

        } catch (Exception e) {

            logger.error(e.getMessage(), e);

        }

    }


    public void handleLogin(String connStr) {

        try {

            String resource = CoreService.parseResource(connStr);

            String userIdStr = CoreService.parseBareUserId(connStr);

            logger.info("handleLogin == userId ====> {} resource {}", userIdStr, resource);

            if (!IMBeanUtils.getImconfig().isPushUserStatus()) {
                return;
            }

            long userId = Long.valueOf(userIdStr).longValue();

            if (0L == userId) {

                logger.info("handleLogin ====> userId  is zero ");
                return;

            }

            if (null == resource || "".equals(resource)) {

                logger.info("handleLogin ====> getResource  is null ");


                return;

            }


            String msg = userId + ":" + '\001' + ":" + resource;


            Message message = new Message("userStatusMessage", msg.getBytes("utf-8"));

            try {

                SendResult result = getUserStatusProducer().send(message);

                if (SendStatus.SEND_OK != result.getSendStatus()) {

                    System.out.println(result.toString());

                }

            } catch (Exception e) {

                logger.error(e.getMessage(), e);

                restartProducer(getUserStatusProducer());

            }

        } catch (Exception e) {

            e.printStackTrace();

        }

    }


    public void closeConnection(String connStr) {

        try {

            String resource = CoreService.parseResource(connStr);

            String userIdStr = CoreService.parseBareUserId(connStr);

            logger.info("closeConnection == userId ====> {} resource {}", userIdStr, resource);

            if (!IMBeanUtils.getImconfig().isPushUserStatus()) {
                return;
            }

            if (StringUtil.isEmpty(userIdStr) || "null".equals(userIdStr) || null == resource) {
                return;
            }

            Message message = null;

            long userId = 0L;

            try {

                userId = Long.valueOf(userIdStr).longValue();

                if (0L == userId) {

                    if (KConstants.isDebug) {
                        logger.info("closeConnection ====> userId  is zero ");
                    }

                    return;

                }

                if (StringUtil.isEmpty(resource) || "null".equals(userIdStr)) {

                    if (KConstants.isDebug) {

                        logger.info("closeConnection ====> getResource  is null ");

                    }

                    return;

                }

            } catch (Exception e) {

                logger.error(e.getMessage(), e);


                return;

            }


            try {

                String msg = userId + ":" + Character.MIN_VALUE + ":" + resource;


                message = new Message("userStatusMessage", msg.getBytes("utf-8"));

                SendResult result = getUserStatusProducer().send(message);

                if (SendStatus.SEND_OK != result.getSendStatus()) {

                    System.out.println(result.toString());

                }

            } catch (Exception e) {

                logger.error(e.getMessage(), e);

                restartProducer(getUserStatusProducer());

            }


        } catch (Exception e) {

            e.printStackTrace();

        }

    }


    public void offMessagePush(ChatMessage chatMessage) {

        int contextType = chatMessage.getType();

        if (26 == contextType || 27 == contextType || 200 == contextType || 201 == contextType) {
            return;
        }

        JSONObject jsonBody = new JSONObject();

        jsonBody.put("type", Short.valueOf(chatMessage.getType()));

        jsonBody.put("content", chatMessage.getContent());

        jsonBody.put("fromUserId", chatMessage.getFromUserId());

        jsonBody.put("toUserId", chatMessage.getToUserId());

        jsonBody.put("fromUserName", chatMessage.getFromUserName());

        jsonBody.put("toUserName", chatMessage.getToUserName());


        jsonBody.put("objectId", chatMessage.getObjectId());

        jsonBody.put("fileName", chatMessage.getFileName());

        jsonBody.put("isReadDel", Integer.valueOf(chatMessage.isReadDel() ? 1 : 0));

        jsonBody.put("isEncrypt", Integer.valueOf(chatMessage.isEncrypt() ? 1 : 0));


        if (2 == chatMessage.getMessageHead().getChatType()) {

            jsonBody.put("roomJid", chatMessage.getMessageHead().getTo());

            jsonBody.put("isGroup", Boolean.valueOf(true));

        } else {

            jsonBody.put("isGroup", Boolean.valueOf(false));

        }

        Message message = null;


        try {

            message = new Message("pushMessage", jsonBody.toJSONString().getBytes("utf-8"));

        } catch (Exception e) {

            logger.error(e.getMessage(), e);


            return;

        }

        try {

            SendResult result = getPushProducer().send(message);

            if (SendStatus.SEND_OK != result.getSendStatus()) {

                System.out.println(result.toString());

            }

        } catch (Exception e) {

            logger.error(e.getMessage(), e);

            restartProducer(getPushProducer());

        }

    }

}


